﻿using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Singer.BackgroundJob.RabbitMQ.BackgroundJobLogs;
using Singer.BackgroundJob.RabbitMQ.Contracts;
using Singer.Core;
using Singer.Middleware.RabbitMQ;
using System.Reflection;
using System.Text;

namespace Singer.BackgroundJob.RabbitMQ;

/// <summary>
/// 消息队列触发后台任务
/// </summary>
public class BackgroundJob : BackgroundService
{
    private readonly BackgroundJobServiceFactory ServiceFactory;
    private readonly CancellationTokenSource StoppingCts;

    private readonly IServiceProvider _appRootService;
    private readonly BackgroundJobOptions _options;
    private readonly ILogger<BackgroundJob> _logger;
    private readonly IRabbitMQConnectionPool _rabbitMQConnectionPool;
    private readonly BackgroundJobConsumerCollection _consumers;

    /// <summary>
    /// 消息队列触发后台任务 基类
    /// </summary>
    public BackgroundJob(
        IServiceProvider appRootService,
        ILogger<BackgroundJob> logger,
        IRabbitMQConnectionPool rabbitMQConnectionPool,
        BackgroundJobConsumerCollection consumers,
        IOptions<BackgroundJobOptions> options)
    {
        StoppingCts = new CancellationTokenSource();
        _appRootService = appRootService;
        _logger = logger;
        _rabbitMQConnectionPool = rabbitMQConnectionPool;
        _consumers = consumers;
        _options = options.Value;
        var connection = _rabbitMQConnectionPool.Get(_options.ConnectionName);
        ServiceFactory = new BackgroundJobServiceFactory(appRootService);
        using IModel channel = connection.CreateModel();
        channel.ExchangeDeclare(_options.ExchangeName, ExchangeType.Direct, true, false, null);
    }

    /// <summary>
    /// 任务执行 逻辑
    /// </summary>
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        IConnection connection = _rabbitMQConnectionPool.Get();
        foreach (var consumerContext in _consumers)
        {
            IModel channel = connection.CreateModel();
            // 声明队列
            channel.QueueDeclare(
                queue: consumerContext.QueueName,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
            // 队列、交换机 绑定
            channel.QueueBind(
                queue: consumerContext.QueueName,
                exchange: _options.ExchangeName,
                routingKey: consumerContext.RoutingKey,
                arguments: null);
            // 声明消费者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += async (sender, e) => await Consumer_Received(sender, e, consumerContext); // 消息消费事件
            // prefetchSize : 消费者预取消息的总大小限制 (字节)，为0时不做限制。
            // prefetchCount : 消费者预取消息的数量，当消费者正在处理消息时，RabbitMQ服务器不会向该消费者发送超过这个数量的消息。
            // global：是否全局配置，如果为true，会应用于当前连接的所有信道的所有消费者；如果为false，仅应用于当前信道上的所有消费者。
            channel.BasicQos(prefetchSize: 0, prefetchCount: _options.ConsumerQos, global: false);
            // autoAck：是否自动应答
            // true：自动确认，消费者接收到消息时，就自动确认接收，消息就会删除。
            // false：手动确认，需要手动调用 channel.BasicAck 方法确认接收消息。  channel.BasicNack 手动拒绝消息，消息重新回到队列。
            // 在设置手动应答时，消费者消费消息期间，如果没手动ack，该消息是不会被其他消费者消费的。
            // 可以利用手动应答机制，在消息处理成功时确认接收消息。消息处理失败时拒绝接收消息，重新回到队列重新消费。
            channel.BasicConsume(queue: consumerContext.QueueName, autoAck: false, consumer: consumer);
            consumerContext.InitConsumer(consumer, channel);
        }
        CancelConsumerInit(connection); // 初始化 处理任务取消事件 的消费者
        return Task.CompletedTask;
    }

    /// <summary>
    /// 任务停止 逻辑
    /// </summary>
    /// <param name="cancellationToken">任务取消令牌</param>
    public override Task StopAsync(CancellationToken cancellationToken)
    {
        StoppingCts.Cancel();
        return base.StopAsync(cancellationToken);
    }

    /// <summary>
    /// 消费者消费消息事件
    /// </summary>
    /// <param name="consumerContext">消费者信息模型</param>
    private async Task Consumer_Received(object? sender, BasicDeliverEventArgs e, ConsumerContext consumerContext)
    {
        string messageJson = Encoding.UTF8.GetString(e.Body.ToArray());
        BackgroundJobLog jobLog = null;
        IServiceScope serviceScope = null;
        bool retry = false; // 是否重启
        string errorMsg = "";
        try
        {
            // 解析消息参数
            BackgroundJobMessage? message = JsonUtils.ToModel<BackgroundJobMessage>(messageJson);
            if (message == null)
                throw new Exception("消息参数为空！");
            if (string.IsNullOrWhiteSpace(message.JobLogId))
                throw new Exception("后台任务消息参数异常：id为空");

            // 根据消息参数，创建DI服务作用域，获取消息类型对应的任务执行器服务
            serviceScope = await ServiceFactory.CreateServiceScope(message);
            var executor = serviceScope.ServiceProvider.GetService(consumerContext.ExecutorType);
            if (executor == null)
                throw new Exception("任务执行器服务获取失败！");

            // 后台任务日志
            BackgroundJobLogManager? backgroundJobLogManager = serviceScope.ServiceProvider.GetService<BackgroundJobLogManager>();
            if (backgroundJobLogManager == null)
                throw new Exception("后台任务管理服务获取失败！");
            jobLog = await backgroundJobLogManager.GetAsync(message.JobLogId);
            if (jobLog == null)
                throw new Exception("后台任务日志不存在！");

            // 根据根后台任务取消令牌，为每个后台任务执行器 单独创建 子任务取消令牌。父任务取消时，会取消所有子任务
            CancellationTokenSource executorStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(StoppingCts.Token);
            consumerContext.AddJob(jobLog.Id, executorStoppingCts);

            bool success = false; // 任务是否执行成功
            try
            {
                await backgroundJobLogManager.InitAsync(jobLog); // 任务执行器初始化

                // 任务执行器 初始化方法
                string initMethodName = nameof(BackgroundJobExecutorBase.Init);
                MethodInfo? initMethod = consumerContext.ExecutorType.BaseType?.GetMethod(initMethodName);
                if (initMethod == null)
                    throw new Exception($"任务执行器缺少方法'{initMethodName}'");
                initMethod.Invoke(executor, [jobLog, executorStoppingCts]);

                // 任务执行器 执行方法（异步）
                string executeMethodName = nameof(BackgroundJobExecutorBase.ExecuteAsync);
                MethodInfo? executeMethod = consumerContext.ExecutorType.GetMethod(executeMethodName);
                if (executeMethod == null)
                    throw new Exception($"任务执行器缺少方法'{executeMethodName}'");

                await backgroundJobLogManager.RunningAsync(jobLog); // 任务开始执行
                Task? task = (Task?)executeMethod.Invoke(executor, null);
                if (task == null)
                    throw new Exception($"任务执行器方法'{executeMethodName}'解析出错");
                await task;
                consumerContext.Channel?.BasicAck(e.DeliveryTag, false); // 手动确认接收消息
                success = true;
                if (executorStoppingCts?.Token.IsCancellationRequested == false)
                    await backgroundJobLogManager.SuccessAsync(jobLog);
            }
            catch (Exception ex)
            {
                errorMsg = "【异常信息】：" + ex.Message;
                retry = await backgroundJobLogManager.FailAsync(jobLog, ex.Message);
                // if (!retry)  【新】先丢弃消息，若重试，重新推一条一样的消息到队列尾部
                consumerContext.Channel?.BasicNack(e.DeliveryTag, multiple: false, requeue: false); // requeue：false丢弃消息 true重新回到队列
            }
            finally
            {
                consumerContext.RemoveJob(jobLog.Id);
                serviceScope.Dispose();
            }
            string successStr = success ? "成功" : "失败";
            string retryStr = "";
            if (!success)
                retryStr = retry ? "【即将重新消费消息】" : "【消息丢弃】";
            _logger.LogInformation($"【RabbitMQ后台任务】【消息消费成功】【任务执行{success}】{retryStr} 消费者：'{consumerContext.ConsumerName}'；消息：'{messageJson}'。{errorMsg}");
            if (retry)
                _ = FailReturnQueueAsync(e.RoutingKey, messageJson);
        }
        catch (Exception ex)
        {
            consumerContext.Channel?.BasicNack(e.DeliveryTag, multiple: false, requeue: false);
            _logger.LogError($"【RabbitMQ后台任务】【消息消费出错】【消息已丢弃】消费者：'{consumerContext.ConsumerName}'；消息：'{messageJson}'。异常信息：{ex.Message}。");
        }
        finally
        {
            serviceScope?.Dispose();
        }
    }

    /// <summary>
    /// 任务失败后，等待一段时间后，重新回到队列，重新执行
    /// 【新】不阻塞消费者，不阻塞其他消息。失败等待重试前，先丢弃消息，等待一定时间重新推一条一样的消息到队列尾部。
    /// </summary>
    private async Task FailReturnQueueAsync(string routingKey, string messageJson)
    {
        await Task.Delay(TimeSpan.FromSeconds(_options.FailedRetryDelaySeconds));
        IConnection connection = _rabbitMQConnectionPool.Get();
        using IModel channel = connection.CreateModel();
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2; // 指定消息持久化。即服务重启之后依然保留。
        channel.BasicPublish(_options.ExchangeName, routingKey, properties, Encoding.UTF8.GetBytes(messageJson)); // 重新推消息到队列尾部
    }

    /// <summary>
    /// 任务失败后，等待一段时间后，重新回到队列，重新执行
    /// 【已废弃】严重影响消息消费速度。等待重试期间，消费者会阻塞。并且消息重新回到队列是在队列头部，阻塞其他消息。
    /// </summary>
    private async Task FailReturnQueueAsync(ConsumerContext consumerContext, ulong deliveryTag)
    {
        await Task.Delay(TimeSpan.FromSeconds(_options.FailedRetryDelaySeconds));
        consumerContext.Channel?.BasicNack(deliveryTag, multiple: false, requeue: true);
        /* 该方法会严重影响消息消费速度
           消费者消费当前消息，在当前消息未确认或丢弃时，会阻塞当前消费者，与消费者预取消息数量参数有关。
           当消费者预取数量为1时，同时消费消息数量为1，当前消息未确认接受或丢弃时，不会消费其他消息。
           并且回到队列也是队列头部，下次还会继续消费该消息，会阻塞其他消息。
           解决办法：重新回到队列，不使用BasicNack，而是重新创建一条一样的消息，推送到队列尾部，等待重新消费。
         */
    }


    #region 任务取消处理
    /// <summary>
    /// 初始化 处理任务取消事件 的消费者
    /// </summary>
    private void CancelConsumerInit(IConnection connection)
    {
        IModel cancellTaskChannel = connection.CreateModel();
        cancellTaskChannel.QueueDeclare(
            queue: Contants.DEFAULT_QUEUENAME_CANCEL_TASK,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);
        // 队列、交换机 绑定
        cancellTaskChannel.QueueBind(
            queue: Contants.DEFAULT_QUEUENAME_CANCEL_TASK,
            exchange: _options.ExchangeName,
            routingKey: Contants.DEFAULT_ROUTINGKEY_CANCEL_TASK,
            arguments: null);

        // 声明消费者
        EventingBasicConsumer consumer = new EventingBasicConsumer(cancellTaskChannel);
        consumer.Received += CancelConsumer_Received;
        cancellTaskChannel.BasicConsume(Contants.DEFAULT_QUEUENAME_CANCEL_TASK, autoAck: true, consumer: consumer); // 绑定取消队列消费者，设置自动应答模式
    }

    /// <summary>
    /// 任务取消队列 消息消费事件
    /// </summary>
    public void CancelConsumer_Received(object? sender, BasicDeliverEventArgs e)
    {
        string messageJson = Encoding.UTF8.GetString(e.Body.ToArray());
        try
        {
            BackgroundJobCancelTaskMessage? message = JsonUtils.ToModel<BackgroundJobCancelTaskMessage>(messageJson);
            if (message == null)
                throw new Exception("消息参数为空！");
            if (string.IsNullOrWhiteSpace(message.JobLogId))
                throw new Exception("后台任务取消队列消息参数异常：id为空");
            var consumerContext = _consumers.FirstOrDefault(x => x.CurrentJobExecutorStopptingCtsDic.Keys.Contains(message.JobLogId));
            if (consumerContext == null)
                throw new Exception($"未找到运行中的id为'{message.JobLogId}'的任务");
            consumerContext.CurrentJobExecutorStopptingCtsDic[message.JobLogId].Cancel();
            _logger.LogInformation($"【RabbitMQ后台任务】【任务取消成功】消息：'{messageJson}'。");
        }
        catch (Exception ex)
        {
            _logger.LogError($"【RabbitMQ后台任务】【任务取消出错】消息：'{messageJson}'。异常信息：{ex.Message}。");
        }
    }
    #endregion

    public override void Dispose()
    {
        _consumers.Dispose();
        base.Dispose();
    }
}
